#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <unistd.h>
#include <netdb.h>
#include <sys/epoll.h>

#include "iscsi.h"
#include "iser.h"
#include "iser_rdma.h"
#include "core.h"
#include "rdma_event.h"

int membuf_num = 16 * ISER_MAX_QUEUE_CMD;
size_t membuf_size = DEFAULT_RDMA_TRANSFER_SIZE;

struct iser_op_t {
        void (*iser_req)(struct iser_work_req *req);
};
static struct iscsi_transport iscsi_iser;

static int iser_get_host_name(struct sockaddr_storage *addr, char **name)
{
        int ret;
        char host[NI_MAXHOST];

        if (name == NULL) {
                ret = EINVAL;
                GOTO(err_ret, ret);
        }

        ret = getnameinfo((struct sockaddr *) addr, sizeof(*addr),
                          host, sizeof(host), NULL, 0, NI_NUMERICHOST);
        if (ret) {
                GOTO(err_ret, ret);
        }

        *name = strdup(host);
        if (*name == NULL) {
                ret = ENOMEM;
                GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

static char *iser_ib_op_to_str(enum iser_ib_op_code iser_ib_op)
{
        char *op_str;

        switch (iser_ib_op) {
        case ISER_IB_RECV:
                op_str = "recv";
                break;
        case ISER_IB_SEND:
                op_str = "send";
                break;
        case ISER_IB_RDMA_WRITE:
                op_str = "rdma_wr";
                break;
        case ISER_IB_RDMA_READ:
                op_str = "rdma_rd";
                break;
        default:
                op_str = "Unknown";
                break;
        }
        return op_str;
}

static int iser_rx_handler_non_ff(struct iser_task *task)
{
        int ret = 0;
        struct iser_conn *conn = task->conn;

        switch (conn->h.state) {
        case STATE_START:
        case STATE_READY:
        case STATE_SECURITY:
        case STATE_SECURITY_AUTH:
        case STATE_SECURITY_DONE:
        case STATE_SECURITY_LOGIN:
        case STATE_SECURITY_FULL:
        case STATE_LOGIN:
        case STATE_LOGIN_FULL:
                if (task->opcode == ISCSI_OP_LOGIN_REQ) {
                        DINFO("login rx, conn:%p\n", conn);
                        conn->h.refcount ++;
                        if (schedule_self())
                                schedule_task_new("iser_login_rx", iser_login_rx, task, -1);
                        else {
                                iser_login_rx((void *)task);
                                conn->h.refcount --;
                                YASSERT(conn->h.refcount >= 0);
                        }
                } else {
                        DERROR("non-login pdu during login phase, "
                                "conn:%p opcode:0x%0x\n",
                                conn, task->opcode);
                        ret = -EINVAL;
                }
                break;
        case STATE_EXIT:
        case STATE_CLOSED:
                DINFO("ignored rx, while conn:%p closing\n", conn);
                break;
        default:
                DINFO("ignored rx, conn:%p unexpected state:%ld\n",
                        conn, conn->h.state);
                break;
        }
        return ret;
}

inline static int iser_parse_req_headers(struct iser_task *task)
{
        int ret = -1;
        struct iser_conn *conn = task->conn;
        struct iser_hdr *iser_hdr = task->pdu.iser_hdr;
        struct iscsi_hdr *iscsi_hdr = task->pdu.bhs;
        unsigned pdu_dlength = ntoh24(iscsi_hdr->datasize);
        unsigned pdu_len = pdu_dlength + sizeof(struct iscsi_hdr);

        switch (iser_hdr->flags & 0xF0) {
        case ISER_ISCSI_CTRL:
 //               if (iser_hdr->flags & ISER_RSV) {
                        task->rem_read_stag =
                            be32_to_cpu(iser_hdr->read_stag);
                        task->rem_read_va = be64_to_cpu(iser_hdr->read_va);
                        DBUG("task:%p rstag:0x%x va:0x%"PRIx64 "\n", task,
                                task->rem_read_stag, task->rem_read_va);
//                }
 //               if (iser_hdr->flags & ISER_WSV) {
                        task->rem_write_stag =
                            be32_to_cpu(iser_hdr->write_stag);
                        task->rem_write_va =
                            be64_to_cpu(iser_hdr->write_va);
                        DBUG("task:%p wstag:0x%x va:0x%"PRIx64 "\n", task,
                                task->rem_write_stag, task->rem_write_va);
 //               }
                ret = 0;
                break;
        case ISER_HELLO:
                DINFO("iSER Hello message??\n");
                break;
        default:
                DERROR("malformed iser iser_hdr, flags 0x%02x\n",
                        iser_hdr->flags);
                break;
        }

        task->opcode = iscsi_hdr->opcode & ISCSI_OPCODE_MASK;
        task->is_immediate = iscsi_hdr->opcode & ISCSI_OP_IMMEDIATE ? 1 : 0;
        task->is_read = 0; /* valid for cmds only */
        task->is_write = 0; /* valid for cmds only */

        task->pdu.ahssize = iscsi_hdr->ahssize * 4;
        task->pdu.membuf.addr += task->pdu.ahssize;
        pdu_len += task->pdu.ahssize;
        task->pdu.membuf.size = pdu_dlength;
        task->pdu.membuf.rdma = 0;

        task->tag = iscsi_hdr->itt;
        task->cmd_sn = be32_to_cpu(iscsi_hdr->sn);
        conn->h.exp_stat_sn = be32_to_cpu(iscsi_hdr->exp_sn);

        return ret;
}

static void iser_rx_handler(struct iser_work_req *rxd)
{
        int ret = 0;
        struct iser_task *task = rxd->task;
        struct iser_conn *conn = task->conn;
        int queue_task = 1;

        iser_conn_put(conn);

        ret = iser_parse_req_headers(task);
        if (unlikely(ret))
                goto out;

        if (unlikely(conn->h.state != STATE_FULL)) {
                ret = iser_rx_handler_non_ff(task);
                goto out;
        }

	if (unlikely(conn->h.state == STATE_CLOSE || conn->h.state == STATE_CLOSED)) {
		iser_conn_close(conn);
		goto out;
	}

        INIT_LIST_HEAD(&task->in_buf_list);
        task->in_buf_num = 0;
        INIT_LIST_HEAD(&task->out_buf_list);
        task->out_buf_num = 0;

        if (likely(task->opcode == ISCSI_OP_SCSI_REQ)) {
                ret = iser_scsi_cmd_rx(task);
        } else {
                switch (task->opcode) {
                case ISCSI_OP_SCSI_DATA_OUT:
                        ret = iser_data_out_rx(task);
                        queue_task = 0;
                        break;
                case ISCSI_OP_NOP_OUT:
                        ret = iser_nop_out_rx(task);
                        break;
                case ISCSI_OP_LOGOUT_REQ:
                        DINFO("logout rx\n");
                        break;
                case ISCSI_OP_SCSI_TASK_MGT_REQ:
                        DINFO("tmfunc rx\n");
                        break;
                case ISCSI_OP_TEXT_REQ:
                        DINFO("text rx\n");
                        ret = iser_task_delivery(task);
                        queue_task = 0;
                        break;
                case ISCSI_OP_SNACK_REQ:
                        DERROR("Cannot handle SNACK yet\n");
                        ret = -EINVAL;
                        break;
                default:
                        DERROR("Unknown op 0x%x\n", task->opcode);
                        ret = -EINVAL;
                        break;
                }
        }
        if (likely(!ret && queue_task)) {
                ret = iser_task_queue(conn->h.session, task);
        }
out:
        if (unlikely(ret)) {
                DERROR("conn:%p task:%p err:%d, closing\n",
                        conn, task, ret);
                iser_conn_close(conn);
        }
}

static void iser_tx_complete_handler(struct iser_work_req *txd)
{
        struct iser_task *task = txd->task;
        struct iser_conn *conn = task->conn;
        int opcode = task->pdu.bhs->opcode & ISCSI_OPCODE_MASK;

        DBUG("conn:%p task:%p tag:0x%04"PRIx64 " opcode:0x%x\n",
                conn, task, task->tag, opcode);
        iser_conn_put(conn);

        list_del(&task->tx_list); /* remove from conn->sent_list */

        if (unlikely(task->unsolicited)) {
                conn->nop_in_task = task;
                return;
        }

        iser_task_complete(task);

        if (unlikely(conn->h.state != STATE_FULL))
                iser_login_tx_complete(conn);
        else
                schedule_post_recv(task, conn);
}

static void iser_rdma_wr_complete_handler(struct iser_work_req *rdmad)
{
        struct iser_task *task = rdmad->task;
        struct iser_conn *conn = task->conn;

        DBUG("conn:%p task:%p tag:0x%04"PRIx64 "\n",
                conn, task, task->tag);
        iser_conn_put(conn);

        /* no need to remove from conn->sent_list, it is done in
         * iser_tx_complete_handler(), as rdma-wr is followed by tx */
}

static void iser_rdma_rd_complete_handler(struct iser_work_req *rdmad)
{
        struct iser_task *task = rdmad->task;
        struct iser_conn *conn = task->conn;

        task->rdma_rd_remains -= rdmad->sge[0].length;
        DBUG("conn:%p task:%p tag:0x%04"PRIx64 ", rems rdma:%d unsol:%d\n",
                conn, task, task->tag, task->rdma_rd_remains,
                task->unsol_remains);
        iser_conn_put(conn);

        /* no need to remove from a list, was removed before rdma-rd */

        if (unlikely(conn->h.state != STATE_FULL))
                return;

        if (likely(task->rdma_rd_remains == 0 && task->unsol_remains == 0))
                schedule_task_iosubmit(task, conn);
}

static struct iser_op_t  __request_op__[ISER_IB_OP_END] = {
        {iser_rx_handler},
        {iser_tx_complete_handler},
        {iser_rdma_wr_complete_handler},
        {iser_rdma_rd_complete_handler}
};
/*
 * Deal with just one work completion.
 */
inline void handle_wc(struct ibv_wc *wc)
{
        struct iser_work_req *req = ptr_from_int64(wc->wr_id);
        /*struct iser_task *task;
        struct iser_conn *conn;*/

        DBUG(" %s complete, wr_id:%p len:%d\n",
                iser_ib_op_to_str(req->iser_ib_op), req, wc->byte_len);

        //YASSERT(req->iser_ib_op < ISER_IB_OP_END && req->iser_ib_op >= ISER_IB_RECV);
        __request_op__[req->iser_ib_op].iser_req(req);

        /*switch (req->iser_ib_op) {
        case ISER_IB_RECV:
                iser_rx_handler(req);
                break;
        case ISER_IB_SEND:
                iser_tx_complete_handler(req);
                break;
        case ISER_IB_RDMA_WRITE:
                iser_rdma_wr_complete_handler(req);
                break;
        case ISER_IB_RDMA_READ:
                iser_rdma_rd_complete_handler(req);
                break;
        default:
                task = req->task;
                conn = (task ? task->conn : NULL);
                DERROR("unexpected req op:%d, wc op%d, wc::%p "
                        "wr_id:%p task:%p conn:%p\n",
                        req->iser_ib_op, wc->opcode, wc, req, task, conn);
                if (conn) {
                        iser_conn_close(conn);
                }
                break;
        }*/
}

void handle_wc_error(struct ibv_wc *wc)
{
        struct iser_work_req *req = ptr_from_int64(wc->wr_id);
        struct iser_task *task = req->task;
        struct iser_conn *conn = task ? task->conn : NULL;

        if (wc->status != IBV_WC_WR_FLUSH_ERR) {
                DERROR("conn:%p task:%p tag:0x%04"PRIx64 " wr_id:0x%p op:%s "
                        "err:%s vendor_err:0x%0x\n",
                        conn, task, task->tag, req,
                        iser_ib_op_to_str(req->iser_ib_op),
                        ibv_wc_status_str(wc->status), wc->vendor_err);
        } else {
/*                DWARN("conn:%p task:%p tag:0x%04"PRIx64 " wr_id:0x%p op:%s "
                        "err:%s vendor_err:0x%0x\n",
                        conn, task, task->tag, req,
                        iser_ib_op_to_str(req->iser_ib_op),
                        ibv_wc_status_str(wc->status), wc->vendor_err); */
        }

        switch (req->iser_ib_op) {
        case ISER_IB_SEND:
                /* in both read and write tasks SEND is last,
                 * the task should be completed now */
        case ISER_IB_RDMA_READ:
                /* RDMA-RD is sent separately, and Response
                 * is to be SENT after its completion, so if RDMA-RD fails,
                 * task to be completed now */
                iser_task_complete(task);
                break;
        case ISER_IB_RECV:
                /* this should be the Flush, no task has been created yet */
        case ISER_IB_RDMA_WRITE:
                /* RDMA-WR and SEND response of a READ task
                 * are sent together, so when receiving RDMA-WR error,
                 * wait until SEND error arrives to complete the task */
                break;
        default:
                DERROR("unexpected opcode %d, "
                        "wc:%p wr_id:%p task:%p conn:%p\n",
                        wc->opcode, wc, req, task, conn);
                break;
        }

        if (conn) {
                iser_conn_put(conn);
                iser_conn_close(conn);
        }
}

static int __iser_cm_connect_request__(struct rdma_cm_event *ev, void *core)
{
        int ret, dev_found;
        struct rdma_cm_id *cm_id = ev->id;
        struct iser_conn *conn = NULL;
        struct iser_device *dev;
        core_t *_core = core;

        struct rdma_conn_param conn_param = {
                .responder_resources = 1,
                .initiator_depth = 1,
                .retry_count = 5,
        };

        /* find device */
        dev_found = iser_dev_find(&dev, cm_id);
        if (!dev_found) {
                ret = iser_dev_create(&dev, cm_id);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        } else if (_core) {
                //YASSERT(_core->iser_dev == dev);
        }

        ret = iser_conn_create(&conn);
        if (ret)
                GOTO(err_ret, ret);

        if (_core) {
                conn->private_mem = _core->tls[VARIABLE_CORE];
        } else {
                conn->private_mem = NULL;
        }
        /* relate iser and rdma connections */
        conn->cm_id = cm_id;
        cm_id->context = conn;
        conn->dev = dev;

        iser_conn_login_phase_set(conn, LOGIN_PHASE_START);

        iser_sched_init_event(&conn->sched_tx, iser_sched_tx, conn);
        iser_sched_init_event(&conn->sched_rdma_rd, iser_sched_rdma_rd, conn);
        iser_sched_init_event(&conn->sched_iosubmit, iser_sched_iosubmit, conn);
        iser_sched_init_event(&conn->sched_buf_alloc, iser_sched_buf_alloc, conn);
        iser_sched_init_event(&conn->sched_post_recv, iser_sched_post_recv, conn);
        iser_sched_init_event(&conn->sched_conn_free, iser_sched_conn_free, conn);

        /* initiator is dst, target is src */
        memcpy(&conn->peer_addr, &cm_id->route.addr.dst_addr, sizeof(conn->peer_addr));
        memcpy(&conn->h.peer, &cm_id->route.addr.dst_addr, sizeof(conn->h.peer));
        ret = iser_get_host_name(&conn->peer_addr, &conn->peer_name);
        if (ret)
                conn->peer_name = strdup("Unresolved");

        memcpy(&conn->self_addr, &cm_id->route.addr.src_addr, sizeof(conn->self_addr));
        memcpy(&conn->h.self, &cm_id->route.addr.src_addr, sizeof(conn->h.self));
        ret = iser_get_host_name(&conn->self_addr, &conn->self_name);
        if (ret)
                conn->self_name = strdup("Unresolved");

        DINFO("iser conn from:%s to:%s\n", conn->peer_name, conn->self_name)

        ret = iser_rdma_create_qp(cm_id, conn);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        conn->qp_hndl = cm_id->qp;
        DINFO("conn:%p cm_id:%p, created qp:%p\n", conn, cm_id,
                conn->qp_hndl);

        /*
        * Post buffers for the login phase, only.
        */
        conn->rsize =
                sizeof(struct iser_hdr) +
                sizeof(struct iscsi_hdr) +
                sizeof(struct iscsi_cdb_ahdr) +
                sizeof(struct iscsi_rlength_ahdr) + 8192;

        conn->ssize = conn->rsize;
        ret = iser_conn_alloc_login_resources(conn);
        if (ret)
                GOTO(err_ret, ret);

        conn_param.initiator_depth = dev->device_attr.max_qp_init_rd_atom;
        if (conn_param.initiator_depth > ev->param.conn.initiator_depth)
                conn_param.initiator_depth = ev->param.conn.initiator_depth;

        /* now we can actually accept the connection */
        ret = iser_rdma_accept(conn->cm_id, &conn_param);
        if (ret) {
                DERROR("conn:%p cm_id:%p rdma_accept failed, %m\n",
                        conn, cm_id);
                GOTO(err_ret, ret);
        }

        /* Increment reference count to be able to wait for TIMEWAIT_EXIT
         * when finalizing the disconnect process */
        iser_conn_get(conn);

        conn->h.tp = &iscsi_iser;

        conn->h.state = STATE_START;
        DINFO("conn:%p cm_id:%p, %s -> %s, accepted\n",
                conn, cm_id, conn->peer_name, conn->self_name);

        return 0;

err_ret:
        ret = iser_rdma_reject(cm_id);
        if (ret)
                DERROR("cm_id:%p rdma_reject failed, %m\n", cm_id);

        if (conn)
                iser_conn_free(conn);

        return ret;
}

static int __iser_cm_connect_request(va_list ap)
{
        int ret;
        struct rdma_cm_event *ev = va_arg(ap, struct rdma_cm_event *);
        core_t *core = va_arg(ap, core_t *);

        ret = __iser_cm_connect_request__(ev, core);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

void iser_cm_connect_request(struct rdma_cm_event *ev, void *core)
{
        if (core) {
                core_request(((core_t *)core)->hash, -1, "iser_cm_connect_request",
                                __iser_cm_connect_request, ev, core);
        } else {
                __iser_cm_connect_request__(ev, core);
        }
}

/*
 * Finish putting the connection together, now that the other side
 * has ACKed our acceptance.  Moves it from the temp_conn to the
 * iser_conn_list.
 *
 * Release the temporary conn_info and glue it into iser_conn_list.
 */
static int __iser_cm_conn_established__(struct rdma_cm_event *ev, core_t *core)
{
        struct rdma_cm_id *cm_id = ev->id;
        struct iser_conn *conn = cm_id->context;

        if (conn->h.state == STATE_START) {
                conn->h.state = STATE_READY;
                DINFO("conn:%p cm_id:%p, %s -> %s, established\n",
                        conn, cm_id, conn->peer_name, conn->self_name);
        } else if (conn->h.state == STATE_READY) {
                DINFO("conn:%p cm_id:%p, %s -> %s, "
                        "execute delayed login_rx now\n",
                        conn, cm_id, conn->peer_name, conn->self_name);
                conn->h.refcount ++;
		if (core) {
			schedule_task_new("iser login rx", iser_login_rx, (void *)conn->login_rx_task, -1);
		} else {
                        conn->h.refcount --;
                        YASSERT(conn->h.refcount >= 0);
                	iser_login_rx((void *)conn->login_rx_task);
                }
        }

        return 0;
}

static int __iser_cm_conn_established(va_list ap)
{
        int ret;
        struct rdma_cm_event *ev = va_arg(ap, struct rdma_cm_event *);
        core_t *core = va_arg(ap, core_t *);

        ret = __iser_cm_conn_established__(ev, core);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

void iser_cm_conn_established(struct rdma_cm_event *ev, void *core)
{
        if (core) {
                core_request(((core_t *)core)->hash, -1, "iser_cm_conn_established",
                                __iser_cm_conn_established, ev, core);
        } else {
                __iser_cm_conn_established__(ev, core);
        }
}
/*
 * Handle RDMA_CM_EVENT_DISCONNECTED or an equivalent event.
 * Start closing the target's side connection.
 */
static int __iser_cm_disconnected__(struct rdma_cm_event *ev)
{
        struct rdma_cm_id *cm_id = ev->id;
        struct iser_conn *conn = cm_id->context;
        enum rdma_cm_event_type ev_type = ev->event;

        DINFO("conn:%p cm_id:%p event:%d, %s\n", conn, cm_id,
                ev_type, rdma_event_str(ev_type));
        iser_conn_close(conn);

        return 0;
}

static int __iser_cm_disconnected(va_list ap)
{
        int ret;
        struct rdma_cm_event *ev = va_arg(ap, struct rdma_cm_event *);

        ret = __iser_cm_disconnected__(ev);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

void iser_cm_disconnected(struct rdma_cm_event *ev, void *core)
{
        if (core) {
                core_request(((core_t *)core)->hash, -1, "iser_cm_disconnected",
                                __iser_cm_disconnected, ev);
        } else {
                __iser_cm_disconnected__(ev);
        }
}
/*
 * Handle RDMA_CM_EVENT_TIMEWAIT_EXIT which is expected to be the last
 * event during the lifecycle of a connection, when it had been shut down
 * and the network has cleared from the remaining in-flight messages.
 */
static int __iser_cm_timewait_exit__(struct rdma_cm_event *ev)
{
        struct rdma_cm_id *cm_id = ev->id;
        struct iser_conn *conn = cm_id->context;

        DERROR("conn:%p refcnt:%d cm_id:%p\n",
                conn, conn->h.refcount, cm_id);

        if (conn->h.state == STATE_CLOSED) {
                /*
                 * Tasks sitting in the conn->tx_list are stuck there after we
                 * close the conn and since each holds a reference on the conn
                 * we need to clean them up explicitly now.  Otherwise they will
                 * prevent the conn from being cleaned up (since the refcount
                 * won't reach zero).  If the conn doesn't get cleaned up, then
                 * along with leaking the conn itself (and all its resources),
                 * we'll also leak any rdma_buf's associated with the tasks.
                 * Since the rdma_buf pool is associated with the iser_device
                 * and so gets reused when a new iser_conn connection is
                 * established, leaking too many of those bufs will eventually
                 * clear the pool.
                 */
                iser_ib_clear_tx_list(conn);
                DERROR("conn:%p refcnt:%d cm_id:%p (after cleanup)\n",
                        conn, conn->h.refcount, cm_id);
        }

        /* Refcount was incremented just before accepting the connection,
         * typically this is the last decrement and the connection will be
         * released instantly */
        iser_conn_put(conn);

        return 0;
}

static int __iser_cm_timewait_exit(va_list ap)
{
        int ret;
        struct rdma_cm_event *ev = va_arg(ap, struct rdma_cm_event *);

        ret = __iser_cm_timewait_exit__(ev);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

void iser_cm_timewait_exit(struct rdma_cm_event *ev, void *core)
{
        if (core) {
                core_request(((core_t *)core)->hash, -1, "iser_cm_timewait_exit",
                                __iser_cm_timewait_exit, ev);
        } else {
                __iser_cm_timewait_exit__(ev);
        }
}

void iser_handle_rdmacm(void *core)
{
        int ret;
        struct rdma_cm_event *ev;
        enum rdma_cm_event_type ev_type;

        ret = iser_rdma_get_event(&ev, core);
        if (ret)
                GOTO(err_ret, ret);

        ev_type = ev->event;
        DINFO("iser  UD-related event:%d, %s\n", ev_type,
                rdma_event_str(ev_type));
        switch (ev_type) {
        case RDMA_CM_EVENT_CONNECT_REQUEST:
                iser_cm_connect_request(ev, core);
                break;

        case RDMA_CM_EVENT_ESTABLISHED:
                iser_cm_conn_established(ev, core);
                break;

        case RDMA_CM_EVENT_CONNECT_ERROR:
        case RDMA_CM_EVENT_REJECTED:
        case RDMA_CM_EVENT_ADDR_CHANGE:
        case RDMA_CM_EVENT_DISCONNECTED:
                iser_cm_disconnected(ev, core);
                break;

        case RDMA_CM_EVENT_TIMEWAIT_EXIT:
                iser_cm_timewait_exit(ev, core);
                break;

        case RDMA_CM_EVENT_MULTICAST_JOIN:
        case RDMA_CM_EVENT_MULTICAST_ERROR:
                DERROR("UD-related event:%d, %s - ignored\n", ev_type,
                        rdma_event_str(ev_type));
                break;

        case RDMA_CM_EVENT_DEVICE_REMOVAL:
                DERROR("Unsupported event:%d, %s - ignored\n", ev_type,
                        rdma_event_str(ev_type));
                break;

        case RDMA_CM_EVENT_ADDR_RESOLVED:
        case RDMA_CM_EVENT_ADDR_ERROR:
        case RDMA_CM_EVENT_ROUTE_RESOLVED:
        case RDMA_CM_EVENT_ROUTE_ERROR:
        case RDMA_CM_EVENT_CONNECT_RESPONSE:
        case RDMA_CM_EVENT_UNREACHABLE:
                DERROR("Active side event:%d, %s - ignored\n", ev_type,
                        rdma_event_str(ev_type));
                break;

        default:
                DERROR("Illegal event:%d - ignored\n", ev_type);
                break;
        }

        ret = iser_rdma_ack_event(ev);
        if (ret)
                DERROR("ack cm event failed, %s\n", rdma_event_str(ev_type));

        return;
err_ret:
        DERROR("failed\n");
        return;
}

void IO_FUNC iser_polling(int hash, void *dev)
{
        (void)hash;
        /*sched_remains = iser_exec_scheduled();
        timeout = sched_remains ? 0 : tmo; */
        iser_exec_scheduled();

        if (likely(dev && srv_running))
                iser_poll_cq((struct iser_device *)dev, 256);

        iser_exec_scheduled();
}

/*
 * Init entire iscsi transport.  Begin listening for connections.
 */
int iser_init(int port, int *_fd, void *core)
{
        int ret, fd;
        struct addrinfo hints, *res, *res0;
        char servname[MAX_NAME_LEN];

        if (core) {
                iser_dev_init();
                iser_sched_init();
        }

        ret = iser_rdma_create_channel(&fd, core);
        if (unlikely(ret)) {
                DERROR("Failed to initialize RDMA; load kernel modules?\n");
                GOTO(err_ret, ret);
        }

        memset(servname, 0, sizeof(servname));
        snprintf(servname, sizeof(servname), "%d", port);

        memset(&hints, 0, sizeof(hints));
        hints.ai_socktype = SOCK_STREAM;
        hints.ai_flags = AI_PASSIVE;

        ret = getaddrinfo(NULL, servname, &hints, &res0);
        if (unlikely(ret)) {
                DERROR("unable to get address info, %m\n");
                ret = errno;
                GOTO(err_ret, ret);
        }

        for (res = res0; res; res = res->ai_next) {
                ret = iser_rdma_add_portal(res, port, core);
                if (unlikely(ret)) {
                        freeaddrinfo(res0);
                        GOTO(err_ret, ret);
                }
        }

        freeaddrinfo(res0);

        ret = rdma_event_add(fd, ISER_EV_FD, EPOLLIN, rdma_handle_event, NULL, core);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        if (core) {
                DINFO("listening for iser connections on core[%d] port %d, fd %d\n",
                      ((core_t *)core)->hash, port, fd);
        } else {
                DINFO("listening for iser connections on port %d, fd %d\n", port, fd);
        }


        *_fd = fd;

        return 0;
err_ret:
        return ret;
}

static struct iscsi_transport iscsi_iser = {
        .name                   = "iser",
        .rdma                   = 1,
        .data_padding           = 1,
        .ep_show                = iser_conn_show,
        .ep_force_close         = iser_conn_force_close,
        .ep_getsockname         = iser_conn_getsockname,
        .ep_getpeername         = iser_conn_getpeername,
};

